package com.jourwon.spring.boot.util;

import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.admin.*;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;

import javax.annotation.Resource;
import java.util.*;

/**
 * kafka主题工具类
 *
 * @author JourWon
 * @date 2022/3/22
 */
@Slf4j
@Component
public class KafkaTopicUtils {

    @Value("${spring.kafka.num.partitions:3}")
    private Integer numPartitions;

    @Value("${spring.kafka.num.replication-factor:1}")
    private short replicationFactor;

    @Resource
    private AdminClient adminClient;

    /**
     * 查询所有Topic
     *
     * @return boolean true:成功,false:失败
     */
    @SneakyThrows
    public List<String> list() {
        ListTopicsResult listTopicsResult = adminClient.listTopics();
        Set<String> names = listTopicsResult.names().get();
        return new ArrayList<>(names);
    }

    /**
     * 创建topic
     *
     * @param topic 主题名称
     * @return boolean true:成功,false:失败
     */
    public boolean createTopic(String topic) {
        log.info("创建topic:{}", topic);
        NewTopic newTopic = new NewTopic(topic, numPartitions, replicationFactor);
        CreateTopicsResult topics = adminClient.createTopics(Collections.singletonList(newTopic));
        try {
            topics.all().get();
        } catch (Exception e) {
            log.error("创建topic失败:{}", e.getMessage(), e);
            return false;
        }
        return true;
    }

    /**
     * 删除topic
     *
     * @param topic 主题名称
     * @return boolean true:成功,false:失败
     */
    public boolean deleteTopic(String topic) {
        log.info("删除topic:{}", topic);
        DeleteTopicsResult deleteTopicsResult = adminClient.deleteTopics(Collections.singletonList(topic));
        try {
            deleteTopicsResult.all().get();
        } catch (Exception e) {
            log.error("删除topic失败:{}", e.getMessage(), e);
            return false;
        }
        return true;
    }

    /**
     * 获取topic详情
     *
     * @param topic 主题名称
     * @return TopicDescription 主题信息
     */
    public TopicDescription describeTopic(String topic) {
        DescribeTopicsResult describeTopicsResult = adminClient.describeTopics(Collections.singletonList(topic));
        try {
            Map<String, TopicDescription> descriptionMap = describeTopicsResult.all().get();
            if (descriptionMap.get(topic) != null) {
                return descriptionMap.get(topic);
            }
        } catch (Exception e) {
            log.error("获取topic详情异常:{}", e.getMessage(), e);
        }
        return null;
    }

}
